查看原文
其他

在 Spark SQL 和 Spark Structured Streaming 中使用 Pulsar

Growth ApachePulsar 2021-10-18

作者 | yjshen

审校 | Anonymitaet

编辑 | Susan


阅读本文需要约 15 分钟。



Pulsar Spark Connector 在 2019 年 7 月 9 日开源,源代码与用户指南参见 

https://github.com/streamnative/pulsar-spark


配置环境


以下示例使用 Homebrew 包管理器在 macOS 下载和安装软件,你可以根据自身需求和操作系统选择其他包管理器。


1

安装 Homebrew。

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

2

安装 Java 8 或更高版本。本示例使用 Homebrew 安装 JDK8。

brew tap adoptopenjdk/openjdk

brew cask install adoptopenjdk8

3

安装 Apache Spark 2.4.0 或更高版本。从官网下载 Spark 2.4.3 并解压,链接如下:

https://www.apache.org/dyn/closer.lua/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

tar xvfz spark-2.4.3-bin-hadoop2.7.tgz

4

下载 Apache Pulsar 2.4.0。从官网下载 Pulsar 2.4.0,链接如下:

https://pulsar.apache.org/en/download/

wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz

tar xvfz apache-pulsar-2.4.0-bin.tar.gz

5

安装 Apache Maven。

brew install maven

6

设置开发环境。本示例创建一个名为 connector-test 的 Maven 工程。


(1) 使用 Scala Maven Plugin 提供的 archetype 构建一个 Scala 项目的框架,Scala Maven Plugin 链接如下:

http://davidb.github.io/scala-maven-plugin/

mvn archetype:generate

在出现的列表中选择 net.alchim31.maven:scala-archetype-simple 的最新版本,当前为 1.7,并为新工程指定 groupId、artifactId 和 version。


本示例使用的是:

groupId: com.example

artifactId: connector-test

version: 1.0-SNAPSHOT

经过以上步骤,一个 Maven 的 Scala 项目框架就基本搭建好了。


(2) 在项目根目录下的 pom.xml 中引入 Spark、Pulsar Spark Connector 依赖, 并使用 maven_shade_plugin 进行项目打包。


a. 定义依赖包的版本信息。

<properties>

<maven.compiler.source>1.8</maven.compiler.source>

<maven.compiler.target>1.8</maven.compiler.target>

<encoding>UTF-8</encoding>

<scala.version>2.11.12</scala.version>

<scala.compat.version>2.11</scala.compat.version>

<spark.version>2.4.3</spark.version>

<pulsar-spark-connector.version>2.4.0</pulsar-spark-connector.version>

<spec2.version>4.2.0</spec2.version>

<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>

</properties>

b. 引入 Spark、Pulsar Spark Connector 依赖。

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_${scala.compat.version}</artifactId>

<version>${spark.version}</version>

<scope>provided</scope>

</dependency>


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_${scala.compat.version}</artifactId>

<version>${spark.version}</version>

<scope>provided</scope>

</dependency>


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-catalyst_${scala.compat.version}</artifactId>

<version>${spark.version}</version>

<scope>provided</scope>

</dependency>


<dependency>

<groupId>io.streamnative.connectors</groupId>

<artifactId>pulsar-spark-connector_${scala.compat.version}</artifactId>

<version>${pulsar-spark-connector.version}</version>

</dependency>

c. 添加包含 pulsar-spark-connector 的 Maven 仓库。

<repositories>


<repository>

<id>central</id>

<layout>default</layout>

<url>https://repo1.maven.org/maven2</url>

</repository>


<repository>

<id>bintray-streamnative-maven</id>

<name>bintray</name>

<url>https://dl.bintray.com/streamnative/maven</url>

</repository>


</repositories>

d. 使用 maven_shade_plugin 将示例类与 pulsar-spark-connector 一同打包。


Spark 读写 Pulsar


示例中的工程包括以下程序:

  1. 从 Pulsar 中读取数据(将该 App 命名为 StreamRead)。

  2. 将数据写入 Pulsar(将该 App 命名为 BatchWrite)。


构建流处理作业,从 Pulsar 读取数据


1. 在 StreamRead 中,创建 SparkSession。

val spark = SparkSession

    .builder()

    .appName("data-read")

    .config("spark.cores.max", 2)

    .getOrCreate()


2. 为了连接至 Pulsar, 需要在构建 DataFrame 时指定 service.url 和 admin.url ,并指定待读取的 topic。

val ds = spark.readStream

    .format("pulsar")

    .option("service.url", "pulsar://localhost:6650")

    .option("admin.url", "http://localhost:8088")

    .option("topic", "topic-test")

    .load()

ds.printSchema()  // 打印 topic-test 的 schema 信息,验证读取成功


3. 将 ds 输出至控制台,启动作业执行。

val query = ds.writeStream

    .outputMode("append")

    .format("console")

    .start()

query.awaitTermination()


将数据写入 Pulsar


1. 同理,在 BatchWrite 中,首先创建 SparkSession。

val spark = SparkSession

    .builder()

    .appName("data-sink")

    .config("spark.cores.max", 2)

    .getOrCreate()


2. 创建 1-10 的列表,并将其转化为 Spark Dataset,写入 Pulsar。

import spark.implicits._

spark.createDataset(1 to 10)

    .write

    .format("pulsar")

    .option("service.url", "pulsar://localhost:6650")

    .option("admin.url", "http://localhost:8088")

    .option("topic", "topic-test")

    .save()


运行程序


首先配置、启动 Spark 和 Pulsar 的单节点集群,再将示例项目打包,并通过 spark-submit 分别提交两个作业,最后观察程序的执行结果。


1

修改 Spark 的日志级别 (可选)。

cd ${spark.dir}/conf

cp log4j.properties.template log4j.properties

在文本编辑器中,将日志级别改为 WARN 。

log4j.rootCategory=WARN, console

2

启动 Spark 集群。

cd ${spark.dir}

sbin/start-all.sh

3

修改 Pulsar WebService 端口为 8088(编辑 ${pulsar.dir}/conf/standalone.conf),避免和 Spark 端口冲突。

webServicePort=8088

4

启动 Pulsar 集群。

bin/pulsar standalone

5

打包示例项目。

cd ${connector_test.dir}

mvn package

6

启动 StreamRead 监控 topic-test 中的数据变化。

${spark.dir}/bin/spark-submit --class com.example.StreamRead --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar

7

在另一个终端窗口中,启动 BatchWrite 向 topic-test 一次性写入 1-10 的数字。

${spark.dir}/bin/spark-submit --class com.example.BatchWrite --master spark://localhost:7077 ${connector_test.dir}/target/connector-test-1.0-SNAPSHOT.jar

8

这时,可以在 StreamRead 所在的终端中得到类似的输出。


至此,我们搭建了 Pulsar 和 Spark 集群,构建了示例项目的框架,使用 Pulsar Spark Connector 完成了从 Spark 读取 Pulsar 数据和向 Pulsar 写入 Spark 数据的操作,提交了最终程序测试。


程序的完整示例,请参阅

https://github.com/yjshen/connector-test。


更多关于 Pulsar 的技术干货和产品动态,请关注 ApachePulsar 微信公众号。

点击“阅读原文”,查看程序完整示例。

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存